package com.xdl.apitest.tabletest

import com.xdl.apitest.SensorReading

import org.apache.flink.table.api.{DataTypes, EnvironmentSettings, Table, TableEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors.{Csv, FileSystem, OldCsv, Schema}

/**
 * Project: FlinkTutorial
 * Package: com.xdl.apitest
 * Version: 1.0
 *
 * Created by guoxiaolong on 2020-08-01-15:21
 */
object TableApiTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)


    //1.创建表执行环境
    val tableEnv =  StreamTableEnvironment.create(env)
    /*
          //1.1 老版本 planner 的流式chaxun
          val seetings: EnvironmentSettings = EnvironmentSettings.newInstance()
            .useOldPlanner() //用老版本
            .inStreamingMode() //流处理模式
            .build()
          val oldStreamTableEnv = StreamTableEnvironment.create(env, seetings)

          //1.2 老版本的批处理环境
          val batchEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
          val batchTableEnvironment: BatchTableEnvironment = BatchTableEnvironment.create(batchEnv)

          //1.3 blink版本的流式处理
          val bsSetting = EnvironmentSettings.newInstance()
            .useBlinkPlanner()
            .inStreamingMode()
            .build()
          val bsTableEnv = StreamTableEnvironment.create(env, seetings)

          //1.4 blink 版本的批式查询
          val bbSettings = EnvironmentSettings.newInstance()
            .useBlinkPlanner()
            .inBatchMode()
            .build()
          val bbTableEnv = TableEnvironment.create( bbSettings)
       */


    // 2.连接外部系统，读取数据
    //2.1 读取文件数据
    val filePath = "D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt"

    tableEnv.connect(new FileSystem().path(filePath))
      .withFormat(new OldCsv()) //定义从外部文件读取数据之后的格式化方法
      .withScahema(new Schema()
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temperature", DataTypes.DOUBLE())
      )
      //定义表结构
      .createTemporaryTable("inputTable") //在表环境注册一张表

    //2.2 消费 Kafka 数据
    tableEnv.connect(new kafka()
      .version("0.11") //定义版本
      .topic("sensor") //定义主题
      .property("zookeeper.connect", "localhost:2181")
      .property("bootstrap.servers", "localhost:9092")
    )
      .withFormat(new Csv())
      .withScahema(new Schema()
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temperature", DataTypes.DOUBLE())
      )
      .createTemporaryTable("KafkaInputTable") //在表环境注册一张表

    //3. 表的查询转换
    val sensorTable: Table = tableEnv.from("inputTable")
    //3.1 简单查询转换
    val resultTable: Table = sensorTable
      .select("id, temperature")
      .filter('id === "snesor_1")

    //3.2 聚合转换
    val aggResultTable: Table = sensorTable
      .groupBy('id)
      .select('id, 'id.count as 'count)

    val aggResultSqlTable: Table = tableEnv.sqlQuery("select id, count(id) as cnt from inputTable group by id")


    //测试输出

    //resultTable.toAppendStream[(String, Long, Double)].print()
    // 回收流  toRetractStream
    resultTable.toRetractStream[(String, Long, Double)].print()

    env.execute("table api test job")

  }

}
